package cn.yanceysong.catalog;


import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;

public class FlinkCataLogDemo {
    public static void main(String[] args) {
        // 创建 TableEnvironment
        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // 创建 JDBC Catalog
        tableEnv.executeSql(
                "CREATE CATALOG my_jdbc_catalog WITH (\n" +
                        "    'type' = 'jdbc',\n" +
                        "    'default-database' = 'flink_test',\n" +
                        "    'username' = 'root',\n" +
                        "    'password' = '123456',\n" +
                        "    'base-url' = 'jdbc:mysql://localhost:3306'\n" +
                        ")"
        );

        // 使用 JDBC Catalog
        tableEnv.executeSql("USE CATALOG my_jdbc_catalog");

        // 查询 MySQL 中的 users 表
        TableResult result = tableEnv.executeSql("SELECT * FROM users WHERE age > 30");

        // 打印查询结果
        result.print();
        // 插入一条新数据
        tableEnv.executeSql(
                "INSERT INTO users (id, name, age) VALUES (4, 'David', 40)"
        );

        // 再次查询数据
        result = tableEnv.executeSql("SELECT * FROM users");
        result.print();
    }
}
